Solutions/Sophos Endpoint Protection/Data Connectors/AzureFunctionSophos/main.py (140 lines of code) (raw):
import os
import logging
import re
import time
import datetime
import calendar
from typing import Callable
import requests
import azure.functions as func
from .sentinel_connector import AzureSentinelConnector
from .state_manager import StateManager
TOKEN = os.environ['SOPHOS_TOKEN']
WORKSPACE_ID = os.environ['WORKSPACE_ID']
SHARED_KEY = os.environ['SHARED_KEY']
FILE_SHARE_CONNECTION_STRING = os.environ['AzureWebJobsStorage']
LOG_TYPE = 'SophosEP'
MAX_SCRIPT_EXEC_TIME_MINUTES = 5
logging.getLogger('azure.core.pipeline.policies.http_logging_policy').setLevel(logging.ERROR)
LOG_ANALYTICS_URI = os.environ.get('logAnalyticsUri')
if not LOG_ANALYTICS_URI or str(LOG_ANALYTICS_URI).isspace():
LOG_ANALYTICS_URI = 'https://' + WORKSPACE_ID + '.ods.opinsights.azure.com'
pattern = r'https:\/\/([\w\-]+)\.ods\.opinsights\.azure.([a-zA-Z\.]+)$'
match = re.match(pattern, str(LOG_ANALYTICS_URI))
if not match:
raise Exception("Invalid Log Analytics Uri.")
def main(mytimer: func.TimerRequest):
logging.info('Starting script.')
sentinel_connector = AzureSentinelConnector(LOG_ANALYTICS_URI, WORKSPACE_ID, SHARED_KEY, LOG_TYPE, queue_size=1000)
sophos = SophosConnector(
token_string=TOKEN,
file_share_connection_string=FILE_SHARE_CONNECTION_STRING,
sentinel_connector=sentinel_connector
)
sophos.process_alerts()
sophos.process_events()
logging.info('Script finished. Sent items: {}'.format(sophos.sentinel.successfull_sent_events_number))
class Params:
def __init__(self, cursor=None, since=None):
self.cursor = cursor
self.since = since
def to_dict(self) -> dict:
d = dict()
if self.cursor:
d['cursor'] = self.cursor
else:
d['from_date'] = self.since
return d
class Token:
def __init__(self, token_txt):
rex_txt = r"url\: (?P<url>https\://.+), x-api-key\: (?P<api_key>.+), Authorization\: (?P<authorization>.+)$"
rex = re.compile(rex_txt)
m = rex.search(token_txt)
self.url = m.group("url")
self.api_key = m.group("api_key")
self.authorization = m.group("authorization").strip()
class SophosConnector:
def __init__(self, token_string: str, file_share_connection_string: str, sentinel_connector: AzureSentinelConnector):
self._start_time = int(time.time())
self.token = Token(token_string)
self.sentinel = sentinel_connector
self.events_state_manager = StateManager(file_share_connection_string, file_path='events_cursor.txt')
self.alerts_state_manager = StateManager(file_share_connection_string, file_path='alerts_cursor.txt')
self.default_since_time_hours = 24
def _get_params(self, state_manager: StateManager) -> Params:
cursor = state_manager.get()
if cursor:
params = Params(cursor=cursor)
else:
since = int(calendar.timegm(((datetime.datetime.utcnow() - datetime.timedelta(hours=self.default_since_time_hours)).timetuple())))
params = Params(since=since)
return params
def get_events_params(self) -> Params:
return self._get_params(self.events_state_manager)
def get_alerts_params(self) -> Params:
return self._get_params(self.alerts_state_manager)
def _save_cursor(self, cursor: str, state_manager: StateManager) -> None:
if cursor:
state_manager.post(cursor)
def save_events_cursor(self, cursor) -> None:
logging.info(f'saving events cursor {cursor}')
self._save_cursor(cursor, self.events_state_manager)
def save_alerts_cursor(self, cursor) -> None:
logging.info(f'saving alerts cursor {cursor}')
self._save_cursor(cursor, self.alerts_state_manager)
@property
def _headers(self) -> dict:
return {
'Content-Type': 'application/json; charset=utf-8',
'Accept': 'application/json',
'X-Locale': 'en',
'Authorization': self.token.authorization,
'x-api-key': self.token.api_key
}
def _get_url_params(self, params: Params) -> dict:
url_params = {
'limit': 1000
}
url_params.update(params.to_dict())
return url_params
def _process_endpoint(self, endpoint: str, params: Params, record_type: str, save_cursor_func: Callable) -> None:
url = f'{self.token.url}{endpoint}'
while True:
if self.check_if_script_runs_too_long():
logging.info('Script is running too long. Exit script.')
break
url_params = self._get_url_params(params)
logging.info(f'Making request - url: {url}, params: {url_params}')
res = requests.get(url, headers=self._headers, params=url_params)
if not res.ok:
raise Exception(f'Error while obtaining data (response code: {res.status_code}): {res.text}')
res = res.json()
for item in res['items']:
item['datastream'] = record_type
self.sentinel.send(item)
self.sentinel.flush()
next_cursor = res['next_cursor']
save_cursor_func(next_cursor)
if not res['has_more']:
break
else:
params = Params(cursor=next_cursor, since=None)
def process_events(self):
self._process_endpoint(
endpoint='/siem/v1/events',
params=self.get_events_params(),
record_type='event',
save_cursor_func=self.save_events_cursor
)
def process_alerts(self):
self._process_endpoint(
endpoint='/siem/v1/alerts',
params=self.get_alerts_params(),
record_type='alert',
save_cursor_func=self.save_alerts_cursor
)
def check_if_script_runs_too_long(self):
now = int(time.time())
duration = now - self._start_time
max_duration = int(MAX_SCRIPT_EXEC_TIME_MINUTES * 60 * 0.85)
return duration > max_duration